前言
在rabbitmq中支持两种消息处理的模式,一种是订阅模式(也叫push模式),由broker主动将消息推送给订阅队列的消费者;另一种是检索模式(也叫pull模式),需要消费者调用channel.basicGet方法,主动从队列中拉取消息。下面将一一介绍这两种模式的使用方式及优缺点。
订阅模式(push)
import com.rabbitmq.client.Consumer; |
订阅模式接收消息是最有效的一种消息处理方式,当消息到达broker时,broker会自动将消息投递给匹配的消费者,而不需要消费端手动去拉取。在同一个通道channel中,每个消费者Consumer都有着不同的consumer-tag标识,这个标识可以是客户端指定,也可以由broker服务端自动生成(如果客户端手动指定了,则以客户端的为准,如果没有指定则由服务端自动生成)。
实现一个Consumer,最容易的方式是继承DefaultConsumer类,重写其中的方法即可!,具体使用示例如下:boolean autoAck = false;
// 声明消费者
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
// channel.basicAck(deliveryTag, multiple);
channel.basicAck(deliveryTag, false); // 消费端确认消息处理成功
}
});
这里,因为我们关闭了autoAck = false;消息自动确认机制,那么就必须手动在handleDelivery方法中去确认消息已经消费处理成功。
我们可以通过执行Channel.basicCancel方法来显示的取消某个消费者:channel.basicCancel(consumerTag); // consumerTag 通道内消费者唯一标识
ps: 每个通道channel都有自己的调度线程,绝大多数情况下一个channel只有一个consumer,这样一个消费者在操作时就不会阻塞其他消费者;但是如果每个通道有多个消费者,请注意长期运行的消费者可能会阻塞向该通道上的其他消费者发送回调。
检索模式(pull)
通过使用Channel.basicGet显示拉取消息,返回的数据类型是GetResponse实例。boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
// No message retrieved.
} else {
AMQP.BasicProperties props = response.getProps();
byte[] body = response.getBody();
long deliveryTag = response.getEnvelope().getDeliveryTag();
...
...
channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message
}
同样,由于这里设置了autoAck = false;我们必须手动去确认已经成功接收到了消息。